一. 資料準備
這邊的code是參考coursera上課程的code,根據自己的需求改成中文的範例
此資料為conllu檔,所以先透過處理這個檔的package來整理
整理為以下格式,以 '天氣真好'為例:
匯入資料
## 資料準備
!pip install conllu
from io import open
from conllu import parse_incr
import re
def arrange_dataset(file_name: str, start_word_token: str, start_pos_token: str)-> dict:
data_file = open(file_name, "r", encoding="utf-8")
# 處理 dataset
sentences_list = []
pos_list = []
for tokenlist in parse_incr(data_file):
temp_str = [start_word_token]
temp_pos = [start_pos_token]
for s in tokenlist:
temp_str.append(s['form'])
temp_pos.append(s['upos'])
sentences_list.append(temp_str)
pos_list.append(temp_pos)
return {
'sentences_list': sentences_list,
'pos_list': pos_list,
}
# 處理 train 與 test data
train_file_path = 'UD_Chinese-GSD-master/zh_gsd-ud-train.conllu'
test_file_path = 'UD_Chinese-GSD-master/zh_gsd-ud-test.conllu'
start_w_token = '--s--'
start_pos_token = '--n--'
train_data_dict = arrange_dataset(train_file_path, start_w_token, start_pos_token)
test_data_dict = arrange_dataset(test_file_path, start_w_token, start_pos_token)
train_sentence, train_pos = train_data_dict['sentences_list'], train_data_dict['pos_list']
test_sentence, test_pos = test_data_dict['sentences_list'], test_data_dict['pos_list']
# vocab: 將詞存至字典
vocab = {}
cnt_word = 0
# 計算每個詞出現的次數
freq = defaultdict(int)
for sentence in train_sentence:
for word in sentence:
if word not in vocab:
vocab[word] = cnt_word
cnt_word += 1
freq[word] += 1
print("字典:")
cnt = 0
for k, v in vocab.items():
print(f"{k}:{v}")
cnt += 1
if cnt > 20:
break
# output:
# 字典:
# --s--:0
# 看似:1
# 簡單:2
# ,:3
def assign_unk(word):
punct = set(string.punctuation)
if any(char in punct for char in word):
return "--unk_punct--"
return "--unk--"
def get_word_tag(word, pos_tag, vocab):
if word not in vocab:
word = assign_unk(word)
return word, pos_tag
get_word_tag('tardigrade', 'NN', vocab)
# output: ('--unk--', 'NN')
二. 實作HMM,處理HMM需要的三個矩陣
在計算這三個矩陣之前需要先記算數量,分別有下列三個數量:
* 狀態轉移 (Transition): 前一個詞性與後一個詞性的次數,transition_counts
* 發射的數量 (Emission): 每個字在每個詞性的數量,emission_counts
* 各個tag的數量: tag_count
import pandas as pd
from collections import defaultdict
import math
import numpy as np
def create_three_HMM_matrix(sentences_list: list, pos_list: list) -> dict:
emission_counts = defaultdict(int)
transition_counts = defaultdict(int)
tag_counts = defaultdict(int)
sentence_len = len(sentences_list)
i = 0
for sentence_idx in range(sentence_len):
# 由該句第一個詞
prev_tag = pos_list[sentence_idx][0]
tag_counts[prev_tag] += 1
for word_tag_idx in range(1, len(sentences_list[sentence_idx])):
i += 1
if i % 5000 == 0:
print(f"word count = {i}")
word, tag = get_word_tag(sentences_list[sentence_idx][word_tag_idx],
pos_list[sentence_idx][word_tag_idx],
vocab)
transition_counts[(prev_tag, tag)] += 1
emission_counts[(tag, word)] += 1
tag_counts[tag] += 1
prev_tag = tag
return emission_counts, transition_counts, tag_counts
emission_counts, transition_counts, tag_counts = create_three_HMM_matrix(train_sentence, train_pos)
# get all the POS states
states = sorted(tag_counts.keys())
print(f"POS tags 數量: {len(states)}")
print(states)
# output: POS tags 數量: 16
# ['--n--', 'ADJ', 'ADP', 'ADV', 'AUX', 'CCONJ', 'DET', 'NOUN', 'NUM', 'PART', 'PRON', 'PROPN', 'PUNCT', 'SYM', 'VERB', 'X']
print("transition 矩陣: ")
for ex in list(transition_counts.items())[:3]:
print(ex)
print()
print("emission 矩陣: ")
for ex in list(emission_counts.items())[200:203]:
print (ex)
# transition 矩陣:
# (('--n--', 'AUX'), 8)
# (('AUX', 'ADJ'), 217)
# (('ADJ', 'PUNCT'), 464)
# emission 矩陣:
# (('PUNCT', ':'), 86)
# (('PUNCT', '「'), 332)
# (('VERB', '吃'), 9)
def get_transition_matrix(alpha, tag_counts, transition_counts):
# 先將tag先拿出來
all_tags = sorted(tag_counts.keys())
# 初始一個 transition_matrix的矩陣 因為是得到 詞性與詞性的機率 故矩陣大小一定為詞性的總數x詞性的總數
transition_matrix = np.zeros((len(all_tags), len(all_tags)))
trans_keys = set(transition_counts.keys())
for i in range(len(all_tags)):
for j in range(len(all_tags)):
count = 0
key = (all_tags[i], all_tags[j])
# 將有對應得tag取出來
if key in transition_counts:
count = transition_counts.get(key)
# 取出前一個tag的數量
count_prev_tag = tag_counts[all_tags[i]]
# 用alpha來避免分母為0
transition_matrix[i,j] = (count + alpha)/(count_prev_tag + alpha*len(all_tags))
return transition_matrix
alpha = 0.001
transition_matrix = get_transition_matrix(alpha, tag_counts, transition_counts)
print(f"transition_matrix [0][0]: {transition_matrix[0,0]:.9f}")
print(f"transition_matrix [3][1]: {transition_matrix[3,1]:.4f}")
print("transition matrix:")
transition_sub = pd.DataFrame(transition_matrix[0:4,0:4], index=states[0:4], columns = states[0:4])
print(pd.DataFrame(transition_matrix[0:4,0:4], index=states[0:4], columns = states[0:4]))
# transition_matrix [0][0]: 0.000000250
# transition_matrix [3][1]: 0.0709
# transition matrix:
# --n-- ADJ ADP ADV
# --n-- 2.501866e-07 0.017513 0.130598 0.060545
# ADJ 4.086610e-07 0.024929 0.007356 0.016756
# ADP 2.205550e-07 0.024702 0.017424 0.034407
# ADV 2.181969e-07 0.070914 0.088152 0.089243
def get_emission_matrix(alpha, tag_counts, emission_counts, vocab):
num_tags = len(tag_counts)
all_tags = sorted(tag_counts.keys())
num_words = len(vocab)
# 初始 emission_matrix,因為是每個詞在每個詞性出現的機率,故矩陣大小為 詞性數x語料庫的字數
emission_matrix = np.zeros((num_tags, num_words))
for i in range(num_tags):
for j in range(num_words):
count = 0
key = (all_tags[i], vocab[j])
if key in emission_counts:
count = emission_counts.get(key)
count_tag = tag_counts[all_tags[i]]
emission_matrix[i,j] = (count+alpha)/(num_words*alpha + count_tag)
return emission_matrix
# creating your emission probability matrix. this takes a few minutes to run.
emission_matrix = get_emission_matrix(alpha, tag_counts, emission_counts, list(vocab))
print(f"emission_matrix[0][0]: {emission_matrix[0,0]:.9f}")
print(f"emission_matrix[3][1]: {emission_matrix[3,1]:.9f}")
# Try viewing emissions for a few words in a sample dataframe
cidx = ['其實','決擇','出身', '10']
# Get the integer ID for each word
cols = [vocab[a] for a in cidx]
# Choose POS tags to show in a sample dataframe
rvals =['--n--', 'ADP', 'ADV', 'AUX']
# For each POS tag, get the row number from the 'states' list
rows = [states.index(a) for a in rvals]
# Get the emissions for the sample of words, and the sample of POS tags
emission_matrix_sub = pd.DataFrame(emission_matrix[np.ix_(rows,cols)], index=rvals, columns = cidx )
print(emission_matrix_sub)
# emission_matrix[0][0]: 0.000000249
# emission_matrix[3][1]: 0.000000217
# 其實 決擇 出身 10
# --n-- 2.490900e-07 2.490900e-07 2.490900e-07 2.490900e-07
# ADP 2.197023e-07 2.197023e-07 2.197023e-07 2.197023e-07
# ADV 1.956478e-03 2.173623e-07 2.173623e-07 2.173623e-07
# AUX 3.447547e-07 3.447547e-07 3.447547e-07 3.447547e-07
三. 實作維特比
在得到emission_matrix 與 transition_matrix 後,我們要用維特比演算法來找尋一個句子詞性的最大路徑,維特比演算法可分為三個部分:
初始:
def initialize(states, tag_counts, transition_matrix, emission_matrix, corpus, vocab):
num_tags = len(tag_counts)
best_probs = np.zeros((num_tags, len(corpus)))
best_paths = np.zeros((num_tags, len(corpus)), dtype=int)
s_idx = states.index("--n--")
for i in range(num_tags):
if transition_matrix[s_idx, i] == 0:
best_probs[i,0] = float('-inf')
else:
best_probs[i,0] = math.log(transition_matrix[s_idx][i]) + math.log(emission_matrix[i][vocab["--s--"]])
return best_probs, best_paths
只有第0行有數值(先計算由'--s--' 算出每個接在'--s--'詞性的機率)
best_probs, best_paths = initialize(states, tag_counts, transition_matrix, emission_matrix, test_sentence[0], vocab)
# Test the function
print(f"best_probs[0,0]: {best_probs[0, 0]}")
print(f"best_paths[2,3]: {best_paths[2, 3]}")
# output:
# best_probs[0,0]: -30.40651015286206
# best_paths[2,3]: 0
def viterbi_forward(A, B, test_corpus, best_probs, best_paths, vocab):
num_tags = best_probs.shape[0]
for i in range(1, len(test_corpus)):
for j in range(num_tags):
best_prob_i = float('-inf')
best_path_i = None
for k in range(num_tags):
prob = best_probs[k][i-1] + math.log(A[k][j]) + math.log(B[j][vocab[test_corpus[i]]])
if best_prob_i < prob:
best_prob_i = prob
best_path_i = k
best_probs[j,i] = best_prob_i
best_paths[j,i] = best_path_i
return best_probs, best_paths
# 更新 best_probs, best_paths
best_probs, best_paths = viterbi_forward(transition_matrix, emission_matrix, test_sentence[0], best_probs, best_paths, vocab)
def viterbi_backward(best_probs, best_paths, corpus, states):
m = best_paths.shape[1]
z = [None] * m
num_tags = best_probs.shape[0]
best_prob_for_last_word = float('-inf')
pred = [None] * m
for k in range(num_tags):
if best_probs[k][m-1] > best_prob_for_last_word:
best_prob_for_last_word = best_probs[k][m-1]
z[m - 1] = k
pred[m - 1] = states[z[m - 1]]
for i in range(m - 1, -1, -1):
pos_tag_for_word_i = z[i]
z[i - 1] = best_paths[pos_tag_for_word_i][i]
pred[i - 1] = states[z[i-1]]
return pred
pred = viterbi_backward(best_probs, best_paths, test_sentence[0], states)
m=len(pred)
print('The prediction for pred[0:8] is: \n', pred[0:7], "\n", test_sentence[0][0:7])
The prediction for pred[0:8] is:
['PRON', 'ADV', 'PUNCT', 'PRON', 'PART', 'NOUN', 'ADV']
['--s--', '然而', ',', '這樣', '的', '處理', '也']
今天這篇比較長,如果對code不熟可以一行一行慢慢trace,尤其是維特比真的花好久才知道怎麼做,連結[1]附上該堂課的程式碼code,歡迎大家看,明天會開始說明詞/句子等相關詞向量的表示方法~~
參考資訊
[1] 範例程式